package com.shujia.sql

import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

object Demo8Readhbase {
  def main(args: Array[String]): Unit = {
    val setting: EnvironmentSettings = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      .build()

    val table: TableEnvironment = TableEnvironment.create(setting)

    /**
      * 读写hbase的建表语句是一样的
      *
      */

    table.executeSql(
      """
        |CREATE TABLE hbase_student (
        | id STRING,
        | info ROW<name STRING,age BIGINT,gender STRING,clazz STRING>,
        | PRIMARY KEY (id) NOT ENFORCED
        |) WITH (
        | 'connector' = 'hbase-1.4',
        | 'table-name' = 'student', -- hbase中的表名
        | 'zookeeper.quorum' = 'master:2181' -- zookeeper地址
        |)
        |
      """.stripMargin)

    table.executeSql(
      """
        |CREATE TABLE print_table
        |WITH ('connector' = 'print')
        |LIKE hbase_student (EXCLUDING ALL)
        |
      """.stripMargin)

    table.executeSql(
      """
        |insert into print_table
        |select * from hbase_student
        |
      """.stripMargin)
  }

}
